!pip install pyyaml h5py # Required to save models in HDF5 format
import pickle
import numpy as np
import pandas as pd
np.random.seed(27)
# Download data & unzip if it doesn't already exist
import os.path
from io import BytesIO
from urllib.request import urlopen
from zipfile import ZipFile
# Download and unzip data files (pickle files) from s3 bucket
data_path = 'data/'
if not os.path.exists(data_path):
data_zip_url = 'https://s3-us-west-1.amazonaws.com/udacity-selfdrivingcar/traffic-signs-data.zip'
with urlopen(data_zip_url) as zip_resp:
with ZipFile(BytesIO(zip_resp.read())) as zfile:
zfile.extractall(path=data_path)
# Using the data (pickled files) provided
training_file = 'data/train.p'
validation_file= 'data/valid.p'
testing_file = 'data/test.p'
with open(training_file, mode='rb') as f:
train = pickle.load(f)
with open(validation_file, mode='rb') as f:
valid = pickle.load(f)
with open(testing_file, mode='rb') as f:
test = pickle.load(f)
X_train, y_train = train['features'], train['labels']
X_valid, y_valid = valid['features'], valid['labels']
X_test, y_test = test['features'], test['labels']
The pickled data is a dictionary with 4 key/value pairs:
'features' is a 4D array containing raw pixel data of the traffic sign images, (num examples, width, height, channels).'labels' is a 1D array containing the label/class id of the traffic sign. The file signnames.csv contains id -> name mappings for each id.'sizes' is a list containing tuples, (width, height) representing the original width and height the image.'coords' is a list containing tuples, (x1, y1, x2, y2) representing coordinates of a bounding box around the sign in the image. THESE COORDINATES ASSUME THE ORIGINAL IMAGE. THE PICKLED DATA CONTAINS RESIZED VERSIONS (32 by 32) OF THESE IMAGES# Number of training examples
n_train = y_train.shape[0]
# Number of validation examples
n_validation = y_valid.shape[0]
# Number of testing examples.
n_test = y_test.shape[0]
# What's the shape of an traffic sign image?
image_shape = X_train.shape[1:]
# How many unique classes/labels there are in the dataset.
n_classes = np.unique(y_train).shape[0]
print("Number of training examples =", n_train)
print("Number of testing examples =", n_test)
print("Image data shape =", image_shape)
print("Number of classes =", n_classes)
import matplotlib.pyplot as plt
%matplotlib inline
def plot_random_sample_of_images(X, y, n_imgs=8, figsize=(1,5), figscale=15):
'''
'''
imgs_labels_in_order = []
# List will be of the form label_str,n_imgs for each "row"
for i_label,label in enumerate(np.unique(y)):
imgs_labels_in_order.append(f'Label #{label}')
subset_of_X = X[y==label]
shuffled_subset_X = subset_of_X[np.random.permutation(subset_of_X.shape[0])[:n_imgs],:]
for img in shuffled_subset_X[:n_imgs]:
imgs_labels_in_order.append(img)
##################
figsize = (figsize[0]*figscale, figsize[1]*figscale)
fig = plt.figure(figsize=figsize)
grid = plt.GridSpec(
np.unique(y).shape[0]+1,
n_imgs+1,
wspace=0.0,
hspace=0.0
)
for i_img,img in enumerate(imgs_labels_in_order):
# Make sure the right thing is plotted (one spot extra for label)
ax = fig.add_subplot(grid[i_img//(n_imgs+1),i_img%(n_imgs+1)])
# Write label text in row first
if not i_img%(1+n_imgs):
ax.text(0.5,0.5,img,fontsize=10, ha='center')
else:
ax.imshow(img)
# Nice and neat; don't need numerical axes
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
return fig
fig_random_sample = plot_random_sample_of_images(X_train,y_train);
fig_random_sample.show()
From these randomly sampled images, we can see that many of the signs vary in brightness. This can definitely make classification more difficult. We also note that the signs tend to come in one of two shapes, a circle or a triangle. We also note that the signs generally mainly use some combination of red, white, and blue. Lastly, many of the signs are nearly identical in color and shape but differ from the icon within the sign.
# Histogram of the number of labels given
train_labels = pd.Series(data=train.get('labels'))
train_labels.hist(bins=train_labels.unique().shape[0])
plt.title('Distribution of training labels')
plt.show()
train_label_counts = train_labels.value_counts()
count_gt_1950 = train_label_counts[train_label_counts>1900]
count_lt_300 = train_label_counts[train_label_counts<300]
print(f'Number of labels w/ more than 1900 images: {count_gt_1950.shape[0]}')
print(f'Number of labels w/ less than 300 images: {count_lt_300.shape[0]}')
We can observe there is a significant class imbalance with some classes with approximately $2000$ images while eleven classes have less than $300$ images.
def get_stat_brightness_each_channel(image_array, label):
'''Using array of images and a given label (integer), return an array of
statistics on the pixel brightness for each image.
'''
# Average of all pixels across each of three channels
full_stats = np.array([
np.concatenate((
#Across all channels
np.array([t.mean()]),
np.array([t.var()]),
np.array([np.median(t)]),
#Over each channel
t.mean(axis=(0,1)),
t.var(axis=(0,1)),
np.median(t,axis=(0,1))
)).ravel()
for t in image_array
])
# Need the same shape to add labels to
labels_shape = (full_stats.shape[0], 1)
full_stats_with_labels = np.append(
full_stats,
label*np.ones(labels_shape),
axis=1
)
return full_stats_with_labels
def pipeline_img_stats(X, y):
'''Pipeline that will take in features (images) and labels to return a
dataframe of summary statistics.
'''
img_by_label = {label:X[y==label] for label in np.unique(y)}
stats_data = np.concatenate([
get_stat_brightness_each_channel(images_for_label,label)
for label,images_for_label in img_by_label.items()
])
df = pd.DataFrame(
data=stats_data,
columns=[
'channel_all_mean',
'channel_all_var',
'channel_all_median',
'channel_0_mean',
'channel_1_mean',
'channel_2_mean',
'channel_0_var',
'channel_1_var',
'channel_2_var',
'channel_0_median',
'channel_1_median',
'channel_2_median',
'label'
],
)
return df
df_img_stats = pipeline_img_stats(X_train,y_train)
df_img_stats.head()
import sklearn.ensemble
import sklearn.preprocessing
rf = sklearn.ensemble.RandomForestClassifier(
random_state=0,
max_depth=None,#None,
min_samples_split=20,#2,
min_samples_leaf=1,#1
)
encoder = sklearn.preprocessing.OneHotEncoder()
encoder.fit(np.unique(y_train).reshape(-1,1))
X_img_stats_train = df_img_stats.iloc[:,:-1]
y_img_stats_train = df_img_stats.iloc[:,-1]
rf.fit(X_img_stats_train, y_img_stats_train)
y_ttest = encoder.transform(y_test.reshape(-1,1)).todense()
df_test = pipeline_img_stats(X_test,y_test)
X_img_stats_test = df_test.iloc[:,:-1]
y_hat = rf.predict(X_img_stats_test)
y_true = df_test.iloc[:,-1]
differences = (y_hat == y_true)
from sklearn.metrics import precision_score,recall_score,accuracy_score
print('Accuracy (not weighted):',accuracy_score(y_true, y_hat))
print('Precision (weighted):',precision_score(y_true, y_hat, average='weighted'))
print('Precision (not weighted):',precision_score(y_true, y_hat, average='micro'))
print('Recall (weighted):',recall_score(y_true, y_hat, average='micro'))
import seaborn as sns
from sklearn.metrics import confusion_matrix
def plot_cm(y_true, y_pred, figsize=(10,10)):
cm = confusion_matrix(y_true, y_pred, labels=np.unique(y_train).astype(int))
cm_sum = np.sum(cm, axis=1, keepdims=True)
cm_perc = cm / cm_sum.astype(float) * 100
annot = np.empty_like(cm).astype(str)
nrows, ncols = cm.shape
for i in range(nrows):
for j in range(ncols):
c = cm[i, j]
p = cm_perc[i, j]
if i == j:
s = cm_sum[i]
annot[i, j] = f'{p:.1f}'#'%.1f%%\n%d/%d' % (p, c, s)
elif p < 10.:
annot[i, j] = ''
else:
annot[i, j] = f'{p:.1f}'#'%.1f%%\n%d' % (p, c)
cm = pd.DataFrame(cm, index=np.unique(y_train).astype(int), columns=np.unique(y_train))
cm.index.name = 'Actual'
cm.columns.name = 'Predicted'
fig, ax = plt.subplots(figsize=figsize)
sns.heatmap(cm, cmap= "Blues", annot=annot, fmt='', ax=ax)
plot_cm(y_true,y_hat,(40,30))
We start our data preprocessing with one-hot encoding the targets. This is necessary for training since we don't want the model to learn from the numerical labeling ("label encoding"). For fun, we observe what happens when we train off these label encoded targets.
We also normalize the pixels in the image with the suggestion of subtracting $128$ and dividing the sum by $128$. This effectively transforms the pixels' range from $(0,255)$ to $(-1,1)$ which is easier for training a neural network.
We actually skip converting the images to grayscale since there is a lot information provided by the color of the signs (blue vs red). It's arguable that color is unnecessary for recognition, the color feature likely would be helpful for model to train off of.
from sklearn.preprocessing import OneHotEncoder
one_hot_encoder = OneHotEncoder(sparse=False)
one_hot_encoder.fit(np.unique(y_train).reshape(-1,1))
y_train_encoded = one_hot_encoder.transform(y_train.reshape(-1,1))
y_valid_encoded = one_hot_encoder.transform(y_valid.reshape(-1,1))
y_test_encoded = one_hot_encoder.transform(y_test.reshape(-1,1))
# Normalizing
def normalize_image_data(img_array, mean=128, std=128):
'''Return a normalize scaling of images (on all pixels in each channel)
'''
return (img_array-mean)/std
We've actually designed a few model architectures going from simple to more complex (deeper and more parameters). We base each of the three different models on convolutional layers following a pattern of two convolutions before a max pool layer. The convolutions increase in complexity by adding more filters as the we progress into the deeper layers. We apply batch normalization throughout to help speed up training and provide some regularization.
Note we also define a DefaultConv2D function to create convolutional layers using the ReLU activation function and the He normal initializer for the kernel since this has been shown to do well for the ReLU activation function
import tensorflow as tf
from tensorflow import keras
from functools import partial
DefaultConv2D = partial(keras.layers.Conv2D, kernel_initializer='he_normal',
kernel_size=3, activation='relu', padding='SAME')
# Define different model architectures
def get_model_architectures():
model_architectures = {
'basic_cnn-32cov-64cov-dense':
keras.models.Sequential([
# Add convolution 2D
DefaultConv2D(filters=64, kernel_size=3,activation='relu', padding="same",
kernel_initializer='he_normal', input_shape=[32, 32, 3]),
keras.layers.BatchNormalization(),
keras.layers.Conv2D(32,kernel_size=(3, 3), activation='relu'),
keras.layers.BatchNormalization(),
keras.layers.Conv2D(32,kernel_size=5,strides=2,padding='same',activation='relu'),
keras.layers.MaxPooling2D((2, 2)),
keras.layers.BatchNormalization(),
# Add dropouts to the model
keras.layers.Dropout(0.4),
keras.layers.Conv2D(64, kernel_size=(3, 3), strides=2,padding='same', activation='relu'),
keras.layers.MaxPooling2D(pool_size=(2, 2)),
keras.layers.BatchNormalization(),
keras.layers.Conv2D(64, kernel_size=(3, 3), strides=2,padding='same', activation='relu'),
# Add dropouts to the model
keras.layers.Dropout(0.4),
keras.layers.Flatten(),
keras.layers.Dense(units=128, activation='relu'),
keras.layers.Dropout(0.4),
keras.layers.Dense(units=64, activation='relu'),
keras.layers.Dropout(0.4),
keras.layers.Dense(units=43, activation='softmax'),
]),
'complex_cnn-32cov-64cov-dense':
keras.models.Sequential([
keras.layers.BatchNormalization(),
DefaultConv2D(filters=64, kernel_size=5, input_shape=[32, 32, 3]),
keras.layers.MaxPooling2D(pool_size=2),
keras.layers.BatchNormalization(),
DefaultConv2D(filters=32),
keras.layers.BatchNormalization(),
DefaultConv2D(filters=32),
keras.layers.MaxPooling2D(pool_size=2),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.4),
DefaultConv2D(filters=64),
keras.layers.BatchNormalization(),
DefaultConv2D(filters=64),
keras.layers.MaxPooling2D(pool_size=2),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.4),
keras.layers.Flatten(),
keras.layers.Dense(units=128, activation='relu'),
keras.layers.Dropout(0.4),
keras.layers.Dense(units=64, activation='relu'),
keras.layers.Dropout(0.4),
keras.layers.Dense(units=43, activation='softmax'),
]),
'complex_cnn-32cov-64cov-128cov-dense':
keras.models.Sequential([
keras.layers.BatchNormalization(),
DefaultConv2D(filters=64, kernel_size=5, input_shape=[32, 32, 3]),
keras.layers.MaxPooling2D(pool_size=2),
keras.layers.BatchNormalization(),
DefaultConv2D(filters=32),
keras.layers.BatchNormalization(),
DefaultConv2D(filters=32),
keras.layers.MaxPooling2D(pool_size=2),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.4),
DefaultConv2D(filters=64),
keras.layers.BatchNormalization(),
DefaultConv2D(filters=64),
keras.layers.MaxPooling2D(pool_size=2),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.4),
DefaultConv2D(filters=128),
keras.layers.BatchNormalization(),
DefaultConv2D(filters=128),
keras.layers.MaxPooling2D(pool_size=2),
keras.layers.BatchNormalization(),
keras.layers.Dropout(0.4),
keras.layers.Flatten(),
keras.layers.Dense(units=128, activation='relu'),
keras.layers.Dropout(0.4),
keras.layers.Dense(units=64, activation='relu'),
keras.layers.Dropout(0.4),
keras.layers.Dense(units=43, activation='softmax'),
])
}
# Create a default from suuplied architectures
model_architectures['default'] = model_architectures.get('basic_cnn-32cov-64cov-dense')
return model_architectures
Now we come to actually train the models and evaluate their performances. We will train and evaluate on three different architectures so we'll be testing all three with the test set. Note we observe a training without one-hot encoding but will not consider its results as valid for our final test.
For the other models, we train each CNN with a batch size of $128$ to help speed up convergence. We also set each to train for $150$ epochs though we implement early stopping so after $8$ epochs with no change in the validation accuracy greater than $0.01%$, the training will stop. This ensures we don't accidentally drastically overfit to our data. We generally observe the models stop training after about $50$ epochs.
Lastly, we include a checkpoint to only save the best weights of the models (based on the validation loss). This allows us to go back to a better model at the end if the early stopping callback stops the training. We also use Nesterov momentum with the Adam optimizer since Adam tends to do quite well in helping models to converge, especially when paired with (Nesterov) momentum.
def compile_model(model_type='default', loss_func='categorical_crossentropy'):
'''Compiles model of given architecture and returns the compiled model.
'''
model_architectures = get_model_architectures()
model = model_architectures.get(model_type,None)
if model is None:
print(f'`{model_type}` is not defined; defaulting to `default` architecture')
model = model_architectures.get('default')
model.compile(
loss=loss_func,
optimizer='nadam',
metrics=['accuracy']
)
return model
Since evaluation is very similar for each model, we created a short function to plot the accuracy and loss for both the training and validation using the model's history.
def eval_model(model, model_history, X, y, show=True):
'''
Print accuracy score and loss for given (trained) model. Also, uses the
model training history to plot the loss & accuracy graphs for the
traning and validation sets.
'''
score = model.evaluate(X, y)
print(f'Accuracy: {100*score[1]:.2f}% \nLoss: {score[0]:.2f}')
if show:
plt.plot(model_history.history['loss'], label='Loss (training data)')
plt.plot(model_history.history['val_loss'], label='Loss (validation data)')
plt.ylabel('Loss')
plt.xlabel('Epoch')
plt.legend(loc='upper right')
plt.show()
plt.plot(model_history.history['accuracy'], label='Accuracy (training data)')
plt.plot(model_history.history['val_accuracy'], label='Accuracy (validation data)')
plt.ylabel('Accuracy')
plt.xlabel('Epoch')
plt.legend(loc='lower right')
plt.show()
Here we observe what happens when no one-hot encoding is done for a relatively simple CNN. The observant reader will see that this network does quite will with little training (less than 20 epochs) compared to the other models being training after one-hot encoding later in this notebook. These results seem dubious since the only real change is how the targets have been labeled. So these results will not be considered for the project.
model = compile_model(loss_func='sparse_categorical_crossentropy')
history = model.fit(
normalize_image_data(X_train),
y_train,
epochs=16,
validation_data=(normalize_image_data(X_valid), y_valid),
verbose=0
)
eval_model(model, history, normalize_image_data(X_valid), y_valid)
We created a encapsulating method to help us select, compile, and train our different models. This allows to train successive models and then evaluate them immediately after training before automatically moving on to the next model.
Note that while we train different models, we use the same batch size, number of epochs, and data (training, validation, and testing sets) for all the models.
def model_train(model_type, X, y, valid, batch_size=128, n_epochs=100, model_str='best_model'):
'''
Train a model for given epochs (and batch size). Saves the model
checkpoints and performs early stopping (small validation loss change over
8 epochs). Returns the trained model and the training history.
'''
model_str = f'models/{model_type}/{model_str}'
checkpoint_filepath = model_str+'-epoch_{epoch:02d}-val_loss_{val_loss:.2f}.ckpt'
checkpoint_dir = os.path.dirname(checkpoint_filepath)
save_best = tf.keras.callbacks.ModelCheckpoint(
filepath=checkpoint_filepath,
monitor='val_loss',
save_weights_only=True,
mode='auto',
save_best_only=True
)
# Early stopping after
stop_after_8_no_change = tf.keras.callbacks.EarlyStopping(
monitor='val_loss',
min_delta=0.001,
patience=8,
restore_best_weights=True
)
model = compile_model(model_type)
X_valid, y_valid = valid
history = model.fit(
normalize_image_data(X_train),
y_train_encoded,
epochs=n_epochs,
batch_size = batch_size,
callbacks=[save_best,stop_after_8_no_change],
validation_data=(normalize_image_data(X_valid), y_valid),
verbose=0
)
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
model.load_weights(latest_checkpoint)
return model, history
# Iterate through all the model architecutres (except the `default`)
model_architectures_types = list(get_model_architectures().keys())
model_architectures_types.remove('default')
for model_type in model_architectures_types:
print(f'========== Evaluation for `{model_type}` model ==========')
model, history = model_train(
model_type=model_type,
X=X_train,
y=y_train_encoded,
valid=(X_valid,y_valid_encoded),
batch_size=128,
n_epochs=150
)
print('Validation data:')
eval_model(model, history, normalize_image_data(X_valid), y_valid_encoded)
print('Test data:')
eval_model(model, history, normalize_image_data(X_test), y_test_encoded, show=False);
print('\n\n')
We observed that our basic_cnn-32cov-64cov-dense model performed the best; it had the smallest validation loss ($0.18$), the best validation accuracy (more than $94\%$), and performing the well on the test set ($~92.5\%$ accuracy).
Though one of the more complex models performed slightly better than the simplest one trained, we're going to chose on using the basic model since we should be less likely to be overfitting using a simpler model.
Next, we'd like to find new traffic images to test on.
The following were images found via Google Street Map near Wattenscheid, Germany. Below the images are shown with links to these images with the matching class from signnames.csv and the link to the source image. Overall, images seem relatively ideal with decent resolution and very little background. However, the Yield sign is slight askew which could reduce the chances of it being classified correctly. We also note that although background behind the signs doesn't seem particularly different from our training sets, it's possible that this will have an effect on the accuracy on the classification of these new images.
# Using this to find the sign names by class number
signnames_dict = pd.read_csv('signnames.csv').to_dict().get('SignName')
We'll rescale the images to a 32x32 (RGB) images so we can use it on our model.
from PIL import Image
from glob import glob
# Matches size of original data
width, height, channels = (32,32,3)
# Paths for all new images
img_paths = glob('data/new_images/*png')
# To store images (32,32,3)
img_list = []
img_names = []
for orig_img_path in img_paths:
img = Image.open(orig_img_path)
img = img.resize((width, height))
img = img.convert('RGB')
img = np.asarray(img, dtype=np.int32)
img_list.append(img)
# Get the class name (file)
img_names.append(orig_img_path.split('/')[-1][-6:-4])
# Plot out the rescaled images
fig, axes = plt.subplots(1, len(img_list), figsize=(15,10))
for n,(i,ax) in enumerate(zip(img_list,axes)):
ax.imshow(i)
class_name = signnames_dict[int(n)][:10]
ax.set_title(f'#{img_names[n]} - {class_name}...')
# Getting the best model architectures with all the pre-trained weights
model_type = 'basic_cnn-32cov-64cov-dense'
model = get_model_architectures().get(model_type)
checkpoint_dir = f'models/{model_type}'
latest_checkpoint = tf.train.latest_checkpoint(checkpoint_dir)
model.load_weights(latest_checkpoint)
# See what the model guesses the new images are
X_new_test = np.array(img_list)
yhat = model.predict(normalize_image_data(X_new_test))
# Get the 5 best predictions
best_guesses = np.argsort(yhat,axis=1)[:,:-6:-1]
best_guesses
Below we plot the original image with its class name and then a random image from the training set (with that class name) for each of the top five predictions by the model as well as the associated certainty.
figsize = (20,20)
fig = plt.figure(figsize=figsize)
grid = plt.GridSpec(
6, # Number of test images plus a header
1 + 5, # Image plus correct plus top 5 guesses
wspace=0.0,
hspace=0.2
)
for i,img in enumerate(X_new_test):
# Test image
ax = fig.add_subplot(grid[i,0])
y = model.predict(normalize_image_data(np.array([img])))
y_likely = np.argmax(y)
ax.imshow(img)
ax.set_title(signnames_dict[int(img_names[i])])
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
# Each of the best guesses
for j,guess in enumerate(best_guesses[i]):
# Pick a random example image
index = np.random.choice(np.where(y_train==guess)[0])
ex_img = X_train[index]
# Get softmax values for each guess
softmax_val = yhat[i][guess]
# Create images right of the test image
ax = fig.add_subplot(grid[i,1+j])
ax.imshow(ex_img)
# Only display the first few letters and percent
title = f'{softmax_val*100:.1f}% - {signnames_dict[guess][:20]}...'
ax.set_title(title, fontsize=12)
ax.get_xaxis().set_visible(False)
ax.get_yaxis().set_visible(False)
As we can see, on the Speed limit (80km/h) correctly identified with its highest certainty. Only the Yield image had the correct image within the top five guesses.
In general, the model tends to be relatively uncertain about its guesses with the exception of the No vehicles image where the model incorrectly guessed with almost a $93\%$ certainty. This general uncertainty shows that the model is having a difficult time in determining the signs. We also not that the model's predictions tend towards "reasonable" guesses such as picking images of the same shape (round vs angles) or similar colors (blue vs red & white).
Overall, the performance of the basic_cnn-32cov-64cov-dense model was not as expected. This might mean that accuracy was not necessarily the metric to judge initial performance. Better metrics, in hindsight, would be to look at precision and recall and perhaps using a confusion matrix to visualize the performance per class. We likely need a more complex model to overcome this underfitting. This might mean going with a more complicated model even if accuracy is lower; performance per class could give a better idea of the true model performance in relation to real-world (desired) performance.